﻿using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;

namespace RabbitMQDemo
{
    public partial class WorkQueues : Form
    {
        private string queueName = "WorkQueues_queue";
        Action<string, TextBox> SetText;
        private readonly static WorkQueues _WorkQueues;
        static WorkQueues()
        {
            _WorkQueues = new WorkQueues();
        }
        /// <summary>
        /// 单例模式
        /// </summary>
        public static WorkQueues SingleForm { get { return _WorkQueues; } }
        private WorkQueues()
        {
            CheckForIllegalCrossThreadCalls = false;
            InitializeComponent();
            ReceiveMsg(txtConsumer1);//消费者1
            ReceiveMsg(txtConsumer2);//消费者2
            SetText += OnSetText;
        }

        private void btnSendMsg_Click(object sender, EventArgs e)
        {
            SendMsg();
        }
        /// <summary>
        /// 发送消息
        /// </summary>
        private void SendMsg()
        {
            string message = txtPublisher.Text;
            if (message.Trim().Length <= 0)
            {
                MessageBox.Show("请输入要发送的消息");
            }
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                //durable-队列持久化,为true的时候告诉rabbitmq不管服务器停止运行还是崩溃或重启都不能丢失队列queue(已经声明过的队列无法再重新定义durable的值,如果这样做了,服务器会返回一个错误)
                channel.QueueDeclare(queue: queueName,
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                var body = Encoding.UTF8.GetBytes(message);

                //消息持久化-为true的时候告诉rabbitmq不管服务器停止运行还是崩溃或重启都不能丢失队列queue,这种消息持久化方式不能保证消息永不丢失,它告诉rabbitmq将消息保存到磁盘,但是当rabbitmq接收到消息但是还没有保存的时候有一个很短的时间窗口.此外,rabbitmq不会为每个消息执行fsync(2)-它可能只是保存到缓存中,而不是真正的写入磁盘.这种消息持久化方式保证不强,用于要求不那么严格的任务队列,已经足够,如果要更完善的消息持久化,可以使用publisher confirms
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;

                channel.BasicPublish(exchange: "",
                                     routingKey: queueName,
                                     basicProperties: properties,
                                     body: body);
            }
        }
        /// <summary>
        /// 接收消息
        /// </summary>
        private void ReceiveMsg(TextBox box)
        {
            try
            {
                var factory = new ConnectionFactory() { HostName = "localhost" };
                var connection = factory.CreateConnection();
                var channel = connection.CreateModel();

                channel.QueueDeclare(
                    queue: queueName,
                    durable: true,
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);

                //公平分发
                //默认情况下rabbitmq会将消息平均地发给消费者,不管消费者是否正在处理消息,也不会考虑消费者是否返回未确认消息,这会造成资源浪费.假如队列中有10条消息,有2个消费者,那么每个消费者会收到5条消息,如果说其中5条消息是要花费大量时间的,并且这5条消息都刚好发送给了第一个消费者，那么第1个消费者将会一直处于忙碌状态,而第二个消费者假如处理的另外5条消息都不花费多少时间那么第二个消费者就会闲置.prefetchCount=1告诉rabbitmq不要向我发送新的消息,直到我处理并确认了前一个消息,然后rabbitmq将会把消息发给下一个消费者
                //假如所有的消费者都处于忙碌状态（没有反馈消息确认给rabbitmq）那么消息将在队列中排队等待,这可能造成queue存储空间不足,需要采取预防措施
                channel.BasicQos(
                    prefetchSize: 0,
                    prefetchCount: 1,
                    global: false);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                  {
                      var msg = Encoding.UTF8.GetString(ea.Body);

                      //要求发送的消息包含'.',每个'.'让线程花1秒时间处理,10个花10秒
                      int dots = msg.Split('.').Length - 1;
                      Thread.Sleep(dots * 1000);

                      //将消息显示在界面上
                      box.Invoke(SetText, msg, box);

                      //手动想rabbitmq发送消息确认
                      channel.BasicAck(
                          deliveryTag: ea.DeliveryTag,
                          multiple: false);
                  };

                //消息确认
                //noAck-自动回复消息 为true的时候consumer收到了一个消息就会立刻返回一个标记给rabbitmq告诉它这个消息我已经拿到了，你可以自由的删除掉它，至于consumer怎么处理这个消息或者处理的过程中出错了在rabbitmq中将找不回这个消息（使用这种方式要确保consumer不会挂掉，否则消息容易丢失）
                channel.BasicConsume(
                    queue: queueName,
                    noAck: false,
                    consumer: consumer);
            }
            catch (Exception ex)
            {
                MessageBox.Show(ex.ToString());
            }
        }

        private void OnSetText(string text, TextBox box)
        {
            box.Text += string.Format("{0}\r\n", text);
        }
    }
}
